home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- import os
- import sys
- import signal
-
- try:
- from popen2 import Popen3
- except ImportError:
- Popen3 = None
-
- from zope.interface import implements
-
- try:
- import subunit
- except ImportError:
- subunit = None
-
- from twisted.trial.unittest import TestCase as TrialTestCase
- from twisted.python.components import Componentized
- from twisted.internet import defer
- from twisted.web import http
- from twisted.protocols.basic import LineReceiver
- from formless import iformless
- from nevow import inevow, context, athena, loaders, tags, appserver
- from nevow.jsutil import findJavascriptInterpreter, generateTestScript
-
- class FakeChannel:
-
- def __init__(self, site):
- self.site = site
-
-
-
- class FakeSite:
- pass
-
-
- class FakeSession(Componentized):
- implements(inevow.ISession)
-
- def __init__(self, avatar):
- Componentized.__init__(self)
- self.avatar = avatar
- self.uid = 12345
-
-
- def getLoggedInRoot(self):
- return self.avatar
-
-
- fs = FakeSession(None)
-
- class FakeRequest(Componentized):
- '''
- Implementation of L{inevow.IRequest} which is convenient to use in unit
- tests.
-
- @ivar lastModified: The value passed to L{setLastModified} or C{None} if
- that method has not been called.
-
- @type accumulator: C{str}
- @ivar accumulator: The bytes written to the response body.
-
- @type deferred: L{Deferred}
- @ivar deferred: The deferred which represents rendering of the response
- to this request. This is basically an implementation detail of
- L{NevowRequest}. Application code should probably never use this.
-
- @ivar _appRootURL: C{None} or the object passed to L{rememberRootURL}.
- '''
- implements(inevow.IRequest)
- fields = None
- failure = None
- context = None
- redirected_to = None
- lastModified = None
- content = ''
- method = 'GET'
- code = http.OK
- deferred = None
- accumulator = ''
- _appRootURL = None
-
- def __init__(self, headers = None, args = None, avatar = None, uri = '/', currentSegments = None, cookies = None, user = '', password = '', isSecure = False):
- '''Create a FakeRequest instance.
-
- headers:
- dict of headers
- args:
- dict of args
- avatar:
- avatar to pass to the FakeSession instance
- uri:
- request URI
- currentSegments:
- list of segments that have "already been located"
- cookies:
- dict of cookies
- user:
- username (like in http auth)
- password:
- password (like in http auth)
- isSecure:
- whether this request represents an HTTPS url
- '''
- Componentized.__init__(self)
- self.uri = uri
- self.prepath = []
- postpath = uri.split('?')[0]
- if not postpath.startswith('/'):
- raise AssertionError
- self.postpath = postpath[1:].split('/')
- self.headers = { }
- if headers:
- for k, v in headers.iteritems():
- self.setHeader(k, v)
-
-
- if not args:
- pass
- self.args = { }
- self.sess = FakeSession(avatar)
- self.site = FakeSite()
- self.received_headers = { }
- if cookies is not None:
- self.cookies = cookies
- else:
- self.cookies = { }
- self.user = user
- self.password = password
- self.secure = isSecure
- self.deferred = defer.Deferred()
-
-
- def URLPath(self):
- url = url
- import nevow
- return url.URL.fromString('')
-
-
- def getSession(self):
- return self.sess
-
-
- def registerProducer(self, producer, streaming):
- '''
- Synchronously cause the given producer to produce all of its data.
-
- This will not work with push producers. Do not use it with them.
- '''
- keepGoing = [
- None]
- self.unregisterProducer = keepGoing.pop
- while keepGoing:
- producer.resumeProducing()
- del self.unregisterProducer
-
-
- def v():
-
- def get(self):
- return self.accumulator
-
- return (get,)
-
- v = property(*v())
-
- def write(self, bytes):
- '''
- Accumulate the given bytes as part of the response body.
-
- @type bytes: C{str}
- '''
- self.accumulator += bytes
-
- finished = False
-
- def finishRequest(self, success):
- self.finished = True
-
-
- def finish(self):
- self.deferred.callback('')
-
-
- def getHeader(self, key):
- return self.headers.get(key.lower())
-
-
- def setHeader(self, key, val):
- self.headers[key.lower()] = val
-
-
- def redirect(self, url):
- self.redirected_to = url
-
-
- def processingFailed(self, f):
- self.failure = f
-
-
- def setResponseCode(self, code):
- self.code = code
-
-
- def setLastModified(self, when):
- self.lastModified = when
-
-
- def prePathURL(self):
- '''
- The absolute URL up until the last handled segment of this request.
-
- @rtype: C{str}.
- '''
- if not self.getHeader('host'):
- pass
- return 'http://%s/%s' % ('localhost', '/'.join(self.prepath))
-
-
- def getClientIP(self):
- return '127.0.0.1'
-
-
- def addCookie(self, k, v, expires = None, domain = None, path = None, max_age = None, comment = None, secure = None):
- '''
- Set a cookie for use in subsequent requests.
- '''
- self.cookies[k] = v
-
-
- def getCookie(self, k):
- '''
- Fetch a cookie previously set.
- '''
- return self.cookies.get(k)
-
-
- def getUser(self):
- '''
- Returns the HTTP auth username.
- '''
- return self.user
-
-
- def getPassword(self):
- '''
- Returns the HTTP auth password.
- '''
- return self.password
-
-
- def getRootURL(self):
- '''
- Return the previously remembered URL.
- '''
- return self._appRootURL
-
-
- def rememberRootURL(self, url = None):
- '''
- For compatibility with appserver.NevowRequest.
- '''
- if url is None:
- raise NotImplementedError('Default URL remembering logic is not implemented.')
- url is None
- self._appRootURL = url
-
-
- def isSecure(self):
- '''
- Returns whether this is an HTTPS request or not.
- '''
- return self.secure
-
-
-
- class TestCase(TrialTestCase):
- hasBools = sys.version_info >= (2, 3)
- _assertions = 0
-
- def failUnlessSubstring(self, containee, container, msg = None):
- self._assertions += 1
- if container.find(containee) == -1:
- if not msg:
- pass
- self.fail('%r not in %r' % (containee, container))
-
-
-
- def failIfSubstring(self, containee, container, msg = None):
- self._assertions += 1
- if container.find(containee) != -1:
- if not msg:
- pass
- self.fail('%r in %r' % (containee, container))
-
-
- assertSubstring = failUnlessSubstring
- assertNotSubstring = failIfSubstring
-
- def assertNotIdentical(self, first, second, msg = None):
- self._assertions += 1
- if first is second:
- if not msg:
- pass
- self.fail('%r is %r' % (first, second))
-
-
-
- def failIfIn(self, containee, container, msg = None):
- self._assertions += 1
- if containee in container:
- if not msg:
- pass
- self.fail('%r in %r' % (containee, container))
-
-
-
- def assertApproximates(self, first, second, tolerance, msg = None):
- self._assertions += 1
- if abs(first - second) > tolerance:
- if not msg:
- pass
- self.fail('%s ~== %s' % (first, second))
-
-
-
- if not hasattr(TrialTestCase, 'mktemp'):
-
- def mktemp(self):
- import tempfile
- return tempfile.mktemp()
-
- TestCase.mktemp = mktemp
-
-
- class AccumulatingFakeRequest(FakeRequest):
- '''
- I am a fake IRequest that is also a stub implementation of
- IFormDefaults.
-
- This class is named I{accumulating} for historical reasons only. You
- probably want to ignore this and use L{FakeRequest} instead.
- '''
- implements(iformless.IFormDefaults)
-
- def __init__(self, *a, **kw):
- FakeRequest.__init__(self, *a, **kw)
- self.d = defer.Deferred()
-
-
- def getDefault(self, key, context):
- return ''
-
-
- def remember(self, object, interface):
- pass
-
-
-
- class FragmentWrapper(athena.LivePage):
- '''
- I wrap myself around an Athena fragment, providing a minimal amount of html
- scaffolding in addition to an L{athena.LivePage}.
- '''
- docFactory = loaders.stan(tags.html[tags.body[tags.directive('fragment')]])
-
- def __init__(self, f):
- super(FragmentWrapper, self).__init__()
- self.f = f
-
-
- def render_fragment(self, ctx, data):
- self.f.setFragmentParent(self)
- return self.f
-
-
-
- def renderLivePage(res, topLevelContext = context.WebContext, reqFactory = FakeRequest):
- '''
- Render the given LivePage resource, performing LivePage-specific cleanup.
- Return a Deferred which fires when it has rendered.
- '''
- D = renderPage(res, topLevelContext, reqFactory)
- return (D.addCallback,)((lambda x: (res._messageDeliverer.close(), x)[1]))
-
-
- def renderPage(res, topLevelContext = context.WebContext, reqFactory = FakeRequest):
- '''
- Render the given resource. Return a Deferred which fires when it has
- rendered.
- '''
- req = reqFactory()
- ctx = topLevelContext(tag = res)
- ctx.remember(req, inevow.IRequest)
- render = appserver.NevowRequest(None, True).gotPageContext
- result = render(ctx)
- (result.addCallback,)((lambda x: req.accumulator))
- return result
-
-
- class NotSupported(Exception):
- '''
- Raised by L{JavaScriptTestCase} if the installation lacks a certain
- required feature.
- '''
- pass
-
-
- class TestProtocolLineReceiverServer(LineReceiver):
- '''
- Subunit protocol which is also a Twisted LineReceiver so that it
- includes line buffering logic.
- '''
- delimiter = '\n'
-
- def __init__(self, proto):
- self.proto = proto
-
-
- def lineReceived(self, line):
- """
- Forward the line on to the subunit protocol's lineReceived method,
- which expects it to be newline terminated.
- """
- self.proto.lineReceived(line + '\n')
-
-
-
- class JavaScriptTestCase(TrialTestCase):
-
- def __init__(self, methodName = 'runTest'):
- TrialTestCase.__init__(self, methodName)
- self.testMethod = getattr(self, methodName)
-
-
- def findJavascriptInterpreter(self):
- '''
- @see L{nevow.jsutil.findJavascriptInterpreter}
- '''
- return findJavascriptInterpreter()
-
-
- def checkDependencies(self):
- '''
- Check that all the dependencies of the test are satisfied.
-
- @raise NotSupported: If any one of the dependencies is not satisfied.
- '''
- js = self.findJavascriptInterpreter()
- if js is None:
- raise NotSupported('Could not find JavaScript interpreter')
- js is None
- if subunit is None:
- raise NotSupported("Could not import 'subunit'")
- subunit is None
- if Popen3 is None:
- raise NotSupported("Could not import 'popen2.Popen3'")
- Popen3 is None
- for name in [
- 'WEXITSTATUS',
- 'WIFSIGNALED',
- 'WTERMSIG']:
- if getattr(os, name, None) is None:
- raise NotSupported('os.%s unavailable' % (name,))
- getattr(os, name, None) is None
-
-
-
- def _writeToTemp(self, contents):
- fname = self.mktemp()
- fd = file(fname, 'w')
-
- try:
- fd.write(contents)
- finally:
- fd.close()
-
- return fname
-
-
- def createSource(self, testModule):
- '''
- Return a string of JavaScript source code which, when executed, will
- run the JavaScript unit tests defined in the given module.
-
- @type testModule: C{str}
- @param testModule: The JavaScript module name which contains the
- tests to run.
-
- @rtype: C{str}
- '''
- js = '\n// import Divmod.UnitTest\n// import %(module)s\n\nDivmod.UnitTest.runRemote(Divmod.UnitTest.loadFromModule(%(module)s));\n' % {
- 'module': testModule }
- return js
-
-
- def makeScript(self, testModule):
- '''
- Write JavaScript source for executing the JavaScript unit tests in
- the given JavaScript module to a file and return the name of that
- file.
-
- @type testModule: C{str}
- @param testModule: The JavaScript module name which contains the
- tests to run.
-
- @rtype: C{str}
- '''
- jsfile = self._writeToTemp(self.createSource(testModule))
- scriptFile = self._writeToTemp(generateTestScript(jsfile))
- return scriptFile
-
-
- def _runWithSigchild(self, f, *a, **kw):
- '''
- Run the given function with an alternative SIGCHLD handler.
- '''
- oldHandler = signal.signal(signal.SIGCHLD, signal.SIG_DFL)
-
- try:
- return f(*a, **kw)
- finally:
- signal.signal(signal.SIGCHLD, oldHandler)
-
-
-
- def run(self, result):
-
- try:
- self.checkDependencies()
- except NotSupported:
- e = None
- result.startTest(self)
- result.addSkip(self, str(e))
- result.stopTest(self)
- return None
-
- js = self.findJavascriptInterpreter()
- script = self.makeScript(self.testMethod())
- server = subunit.TestProtocolServer(result)
- protocol = TestProtocolLineReceiverServer(server)
-
- def run():
- child = Popen3([
- js,
- script])
- while True:
- bytes = child.fromchild.read(4096)
- if bytes:
- protocol.dataReceived(bytes)
- continue
- break
- exitCode = child.wait()
- if os.WIFSIGNALED(exitCode):
- result.addError(self, (Exception, 'JavaScript interpreter exited due to signal %d' % (os.WTERMSIG(exitCode),), None))
- else:
- exitStatus = os.WEXITSTATUS(exitCode)
- if exitStatus:
- result.addError(self, (Exception, 'JavaScript interpreter had error exit: %d' % (exitStatus,), None))
-
-
- self._runWithSigchild(run)
-
-
-
- def setJavascriptInterpreterOrSkip(testCase):
- """
- If we're unable to find a javascript interpreter (currently we only look
- for smjs or js) then set the C{skip} attribute on C{testCase}. Otherwise
- assign the path to the interpreter executable to
- C{testCase.javascriptInterpreter}
- """
- script = findJavascriptInterpreter()
- if script is None:
- testCase.skip = 'No JavaScript interpreter available.'
- else:
- testCase.javascriptInterpreter = script
-
-